FutureTask

FutureTask是J.U.C中的类,是一个可删除的异步计算类。这个类提供了Future接口的的基本实现,使用相关方法启动和取消计算,查询计算是否完成,并检索计算结果。只有在计算完成时才能使用get方法检索结果;如果计算尚未完成,get方法将会阻塞。一旦计算完成,计算就不能重新启动或取消(除非使用runAndReset方法调用计算)。

Runnable与Callable对比

通常实现一个线程我们会使用继承Thread的方式或者实现Runnable接口,这两种方式有一个共同的缺陷就是在执行完任务之后无法获取执行结果。从Java1.5之后就提供了Callable与Future,这两个接口就可以实现获取任务执行结果。

Runnable接口:代码非常简单,只有一个方法run

1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

Callable泛型接口:有泛型参数,提供了一个call方法,执行后可返回传入的泛型参数类型的结果

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

Future接口

Future接口提供了一系列方法用于控制线程执行计算,如下:

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);//取消任务
boolean isCancelled();//是否被取消
boolean isDone();//计算是否完成
V get() throws InterruptedException, ExecutionException;//获取计算结果,在执行过程中任务被阻塞
V get(long timeout, TimeUnit unit)//timeout等待时间、unit时间单位
throws InterruptedException, ExecutionException, TimeoutException;
}

使用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FutureExample {

static class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
}

public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());//线程池提交任务
log.info("do something in main");
Thread.sleep(1000);
String result = future.get();//获取不到一直阻塞
log.info("result:{}", result);
}
}

image-20190805185253727

FutureTask

Future实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable与Future接口,所以它既可以作为Runnable被线程中执行,又可以作为callable获得返回值。

1
2
3
4
5
6
7
public class FutureTask<V> implements RunnableFuture<V> {
...
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

FutureTask支持两种参数类型,Callable和Runnable,在使用Runnable 时,还可以多指定一个返回结果类型。

1
2
3
4
5
6
7
8
9
10
11
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FutureTaskExample {

public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
});

new Thread(futureTask).start();
log.info("do something in main");
Thread.sleep(1000);
String result = futureTask.get();
log.info("result:{}", result);
}
}

image-20190805185338831

ForkJoin

ForkJoin是Java7提供的一个并行执行任务的框架,是把大任务分割成若干个小任务,待小任务完成后将结果汇总成大任务结果的框架。主要采用的是工作窃取算法,工作窃取算法是指某个线程从其他队列里窃取任务来执行。
这里写图片描述

在窃取过程中两个线程会访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常我们会使用双端队列来实现工作窃取算法。被窃取任务的线程永远从队列的头部拿取任务,窃取任务的线程从队列尾部拿取任务。

局限性:

1、任务只能使用fork和join作为同步机制,如果使用了其他同步机制,当他们在同步操作时,工作线程就不能执行其他任务了。比如在fork框架使任务进入了睡眠,那么在睡眠期间内在执行这个任务的线程将不会执行其他任务了。
2、我们所拆分的任务不应该去执行IO操作,如读和写数据文件。
3、任务不能抛出检查异常。必须通过必要的代码来处理他们。

框架核心:

核心有两个类:ForkJoinPool | ForkJoinTask
ForkJoinPool:负责来做实现,包括工作窃取算法、管理工作线程和提供关于任务的状态以及他们的执行信息。
ForkJoinTask:提供在任务中执行fork和join的机制。

使用方式:(模拟加和运算)

@Slf4j
public class ForkJoinTaskExample extends RecursiveTask {

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public static final int threshold = 2;//设定不大于两个数相加就直接for循环,不适用框架
private int start;
private int end;

public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算(分裂算法,可依情况调优)
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

// 执行子任务
leftTask.fork();
rightTask.fork();

// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();

// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}

public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();

//生成一个计算任务,计算1+2+3+4...100
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

//执行一个任务
Future<Integer> result = forkjoinPool.submit(task);

try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}

BlockingQueue阻塞队列

主要应用场景:生产者消费者模型,是线程安全的

阻塞情况:

1、当队列满了进行入队操作
2、当队列空了的时候进行出队列操作

四套方法:

BlockingQueue提供了四套方法,分别来进行插入、移除、检查。每套方法在不能立刻执行时都有不同的反应。

image-20190805185508736




J.U.C之FutureTask

概述

由于线程是实现了Runnable接口或继承了Thread类,其执行后无法回调其线程的执行结果,因此JDK 1.5提供了一些新机制:接口Callable和Future。通过他们可以得到线程的执行结果。

Runnable与Callable对比

  1. Runnable接口只有一个run()方法;
  2. Callable是一个泛型接口,其中也只有V call()函数,其返回类型即传入的参数泛型。
  3. 两个接口功能相似,但后者因可以返回执行信息且支持泛型而更强大一些。

Future接口

通过实现该类可以得到一个异步计算的返回结果。并提供方法接口:检查是否计算完成、等待计算的完成、取回计算的结果、判断计算是否在完成前被正常地取消。

FutureTask类

它实现了RunnableFuture接口,而RunnableFuture接口同时继承了Runnable接口和Future接口。即FutureTask类最终也是执行的Callable的方法。组合Runnable和Future的好处:可以另起线程去专门检查并调取最终计算的结果,而其他的线程可以继续其他任务(只需监听该跑腿线程即可)。

演示例子-Callable和Future结合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
public class FutureExample {

static class MyCallable implements Callable<String> {

@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
}

public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());
log.info("do something in main");
Thread.sleep(1000);
String result = future.get();
log.info("result:{}", result);
}
}
运行结果
1
2
3
11:29:00.230 [main] INFO com.mmall.concurrency.aqs.FutureExample - do something in main
11:29:00.230 [pool-1-thread-1] INFO com.mmall.concurrency.aqs.FutureExample - do something in callable
11:29:05.234 [main] INFO com.mmall.concurrency.aqs.FutureExample - result:Done
例子分析
  1. 结合Callable和Future两者。
  2. 实现的Callable类可以被Executors的submit(xxx)提交给线程池运行。(之前提到过)
  3. 通过Future的get()查询并取回执行结果;若未执行结束则阻塞查询。(前面有说到)

演示例子-FutureTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class FutureTaskExample {

public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
});

new Thread(futureTask).start();
log.info("do something in main");
Thread.sleep(1000);
String result = futureTask.get();
log.info("result:{}", result);
}
}
运行结果

与上个例子结果一模一样,运行速度快了25%左右:Callable和Future结合使用耗时2s 450ms,FutureTask耗时1s 890ms。

看下源码

get()方法的内核:返回执行完成的结果或抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Returns result or throws exception for completed task.
*
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

还有一个可以指定返回执行结果的构造函数:(内部都是调用Executors的callable方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}

还有一个指定超时时间的get():

1
2
3
4
5
6
7
8
9
10
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

当计算失败时(不包括计算被取消)被内部的run()调用:报告其中的异常

1
2
3
4
5
6
7
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

一旦计算完成,该计算就不能被重启或取消了,除非调用runAndReset()方法:(之前概述提到)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}

J.U.C之ForkJoin

概述

ForkJoin框架是JDK 1.7提供的一种用于并行任务执行的框架。有些像Hadoop中的MapReduce机制。即将一个大任务分为(fork)多个小任务分别执行,最后将多个小任务的执行结果进行汇总(join)。

该框架采用的工作窃取算法,即当一个线程执行完它的任务后,可以从其他线程的任务队列尾部开始自行窃取任务进行执行,最后与该队列的另一个线程接头,以充分发挥该框架的优势(消除线程等待),提高效率,促进性能提升。
其中,每个线程的任务队列采用双端队列进行实现。
缺点:1. 当一个线程的双端队列中只有一个任务时,也会发生线程竞争。2. 由于使用双端队列,系统会分配更多的资源。

执行任务的局限性

  1. 任务只能使用Fork或Join作为同步机制;
  2. 线程队列中的任务不可以有IO操作;
  3. 任务不能抛出检查异常。(若有则需要必要的代码进行处理)

ForkJoinPool

看一下注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
* A {@code ForkJoinPool} provides the entry point for submissions
* from non-{@code ForkJoinTask} clients, as well as management and
* monitoring operations.
*
* <p>A {@code ForkJoinPool} differs from other kinds of {@link
* ExecutorService} mainly by virtue of employing
* <em>work-stealing</em>: all threads in the pool attempt to find and
* execute tasks submitted to the pool and/or created by other active
* tasks (eventually blocking waiting for work if none exist). This
* enables efficient processing when most tasks spawn other subtasks
* (as do most {@code ForkJoinTask}s), as well as when many small
* tasks are submitted to the pool from external clients. Especially
* when setting <em>asyncMode</em> to true in constructors, {@code
* ForkJoinPool}s may also be appropriate for use with event-style
* tasks that are never joined.
*/

实质上,ForkJoinPool是一个从非ForkJoinTask的请求中,为运行中的ForkJoinTask的子任务提供切入点的ExecutorService。它不同于其他的ExecutorService,主要是它虚拟部署了任务窃取。尤其当在构造器中设置asyncMode为true时,ForkJoinPool也可能适当地使用事件模式的任务(从未被汇总过??)。
ForkJoinPool主要做实现:包括工作窃取算法、工作线程的管理、任务的状态管理、任务的执行信息等。

ForkJoinTask

看下注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
* A {@code ForkJoinTask} is a thread-like entity that is much
* lighter weight than a normal thread. Huge numbers of tasks and
* subtasks may be hosted by a small number of actual threads in a
* ForkJoinPool, at the price of some usage limitations.
*
* <p>A "main" {@code ForkJoinTask} begins execution when it is
* explicitly submitted to a {@link ForkJoinPool}, or, if not already
* engaged in a ForkJoin computation, commenced in the {@link
* ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or
* related methods. Once started, it will usually in turn start other
* subtasks. As indicated by the name of this class, many programs
* using {@code ForkJoinTask} employ only methods {@link #fork} and
* {@link #join}, or derivatives such as {@link
* #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
* provides a number of other methods that can come into play in
* advanced usages, as well as extension mechanics that allow support
* of new forms of fork/join processing.
*
* <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
* restrictions (that are only partially statically enforceable)
* reflecting their main use as computational tasks calculating pure
* functions or operating on purely isolated objects. The primary
* coordination mechanisms are {@link #fork}, that arranges
* asynchronous execution, and {@link #join}, that doesn't proceed
* until the task's result has been computed. Computations should
* ideally avoid {@code synchronized} methods or blocks, and should
* minimize other blocking synchronization apart from joining other
* tasks or using synchronizers such as Phasers that are advertised to
* cooperate with fork/join scheduling.
*/

ForkJoinTask:主要提供任务中fork/join的机制。

我:大家自己意会一下,但可千万不要言传哦~ 真的很容易看懂的~
读者:emmmmm,是挺容易的,因为它容易就容易在它容易它奶奶的腿……

演示例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {

public static final int threshold = 2;
private int start;
private int end;

public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;

//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

// 执行子任务
leftTask.fork();
rightTask.fork();

// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();

// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}

public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();

//生成一个计算任务,计算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

//执行一个任务
Future<Integer> result = forkjoinPool.submit(task);

try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}
运行结果:
1
14:58:35.442 [main] INFO com.mmall.concurrency.aqs.ForkJoinTaskExample - result:5050
例子分析
  1. 该测试类需要继承RecursiveTask类,即在任务fork中需要递归地拆分任务;
  2. ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);指定startend,其构造函数完成其任务的fork的具体实现,之后再将其子结果join并返回。
  3. 最后通过ForkJoinPool调用submit()执行该任务。
  4. 其中:在任务拆分(fork)中,声明了一个threshold(阈值),即指定任务不可拆分的界限。